Source code for hysop.backend.device.opencl.autotunable_kernels.remesh_dir

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import warnings

from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance
from hysop.tools.misc import upper_pow2_or_3
from hysop.constants import AutotunerFlags, BoundaryCondition
from hysop.numerics.remesh.remesh import RemeshKernel
from hysop.numerics.remesh.kernel_generator import Kernel
from hysop.fields.cartesian_discrete_field import CartesianDiscreteScalarFieldView

from hysop.backend.device.codegen import CodeGeneratorWarning
from hysop.backend.device.codegen.kernels.directional_remesh import (
    DirectionalRemeshKernelGenerator,
)

from hysop.backend.device.opencl import cl, clTools
from hysop.backend.device.opencl.opencl_array import OpenClArray
from hysop.backend.device.opencl.opencl_autotunable_kernel import (
    OpenClAutotunableKernel,
)
from hysop.backend.device.kernel_autotuner import KernelGenerationError


[docs] class OpenClAutotunableDirectionalRemeshKernel(OpenClAutotunableKernel): """Autotunable interface for directional remeshing kernel code generators."""
[docs] def autotune( self, direction, scalar_cfl, position, scalars_in, scalars_out, is_inplace, remesh_kernel, remesh_criteria_eps, force_atomics, relax_min_particles, hardcode_arrays, **kwds, ): """Autotune this kernel with specified configuration. hardcode_arrays means that array offset and strides can be hardcoded into the kernels as constants. """ check_instance(scalars_in, tuple, values=CartesianDiscreteScalarFieldView) check_instance(scalars_out, tuple, values=CartesianDiscreteScalarFieldView) precision = self.typegen.dtype dim = position.dim if not (1 <= dim <= 3): msg = f"Dimension mismatch {dim}." raise ValueError(msg) if not (0 <= direction < dim): msg = f"Invalid direction {direction}." raise ValueError(msg) if not issubclass(precision, npw.floating): msg = f"Precision is not a npw.floating subtype, got {precision}." raise TypeError(msg) if not isinstance(scalar_cfl, float) or scalar_cfl <= 0.0: msg = f"Invalid scalar_cfl value {scalar_cfl}." raise ValueError(msg) if len(scalars_in) != len(scalars_out): raise ValueError("Unmatched scalars input/output.") if not isinstance(remesh_kernel, RemeshKernel) and not isinstance( remesh_kernel, Kernel ): msg = f"Invalid remesh_kernel type {type(remesh_kernel)}." raise TypeError(msg) if (remesh_criteria_eps is not None) and ( not isinstance(remesh_criteria_eps, float) or (remesh_criteria_eps < 0.0) ): msg = f"Invalid remesh_criteria_eps value {remesh_criteria_eps}." raise ValueError(msg) if not isinstance(force_atomics, bool): msg = "Invalid force_atomics value {}, should be a boolean." msg = msg.format(force_atomics) raise ValueError(msg) if not isinstance(relax_min_particles, bool): msg = "Invalid relax_min_particles value {}, should be a boolean." msg = msg.format(relax_min_particles) raise ValueError(msg) if force_atomics and relax_min_particles: msg = "Cannot relax min particles when force_atomics is set because " msg += "there is no minimum particles to be used when using atomic accumulators." raise ValueError(msg) if position.dtype != precision: # TODO implement mixed precision kernels msg = "Array type for position {} do not match required operator precision {}." msg = msg.format(position.dtype, precision.__name__) raise NotImplementedError(msg) pshape = position.compute_resolution work_size = npw.asarray(pshape, dtype=npw.int32).copy() work_dim = work_size.size group_scalars = tuple(dfield.nb_components for dfield in scalars_in) nfields = len(group_scalars) nscalars = sum(group_scalars) ftype = clTools.dtype_to_ctype(precision) min_s_ghosts = DirectionalRemeshKernelGenerator.scalars_out_cache_ghosts( scalar_cfl, remesh_kernel ) min_nparticles = 2 if relax_min_particles else int(2 * npw.ceil(scalar_cfl)) assert min_s_ghosts >= 1 assert min_nparticles >= 2 name = DirectionalRemeshKernelGenerator.codegen_name( work_dim, remesh_kernel, ftype, min_nparticles, nscalars, remesh_criteria_eps, False, is_inplace, ) for dsin, dsout in zip(scalars_in, scalars_out): if dsin.nb_components != dsout.nb_components: msg = "Components mismatch between input field {} and output field {}, " msg += "got input={}, output={}, cannot remesh." msg = msg.format( dsin.name, dsout.name, dsin.nb_components, dsout.nb_components ) raise ValueError(msg) if (dsin.compute_resolution != pshape).any() or ( dsout.compute_resolution != pshape ).any(): msg = "Resolution mismatch between particles and scalars, " msg += "got input={}, output={} but pshape={}, cannot remesh." msg = msg.format( dsin.compute_resolution, dsout.compute_resolution, pshape ) raise ValueError(msg) if dsout.ghosts[-1] < min_s_ghosts: msg = ( "Given boundary condition implies minimum ghosts numbers to be at " ) msg += "least {} in remeshed direction for output scalar but only {} ghosts " msg += "are present in the grid for output scalar {}." msg = msg.format(min_s_ghosts, dsout.ghosts[-1], dsout.name) raise ValueError(msg) if is_inplace and (dsin.dfield != dsout.dfield): msg = "Remeshing inplace but input and output discrete Field differs " msg += f"for {dsin.name} and {dsout.name}." raise ValueError(msg) if (dsin.dtype != precision) or (dsout.dtype != precision): # TODO implement mixed precision kernels msg = "Array types ({}={}, {}={}) do not match required operator precision {}." msg = msg.format( dsin.name, dsin.dtype, dsout.name, dsout.dtype, precision.__name__ ) raise NotImplementedError(msg) make_offset, offset_dtype = self.make_array_offset() make_strides, strides_dtype = self.make_array_strides( position.dim, hardcode_arrays=hardcode_arrays ) kernel_args = {} known_args = {} isolation_params = {} mesh_info_vars = {} target_args = known_args if hardcode_arrays else kernel_args kernel_args["position_base"] = position.sdata.base_data target_args["position_strides"] = make_strides( position.sdata.strides, position.dtype ) target_args["position_offset"] = make_offset( position.sdata.offset, position.dtype ) mesh_info_vars["position_mesh_info"] = self.mesh_info( "position_mesh_info", position.mesh ) isolation_params["position_base"] = dict( count=position.npoints, dtype=position.dtype, arg_value=position.sbuffer.get(), ) arg_index = 1 + 2 * (1 - hardcode_arrays) if is_inplace: for i, dsinout in enumerate(scalars_in): mi = f"S{i}_inout_mesh_info" mesh_info_vars[mi] = self.mesh_info(mi, dsinout.mesh) for j in range(dsinout.nb_components): prefix = f"S{i}_{j}_inout" kernel_args[prefix + "_base"] = dsinout.data[j].base_data target_args[prefix + "_strides"] = make_strides( dsinout.data[j].strides, dsinout.dtype ) target_args[prefix + "_offset"] = make_offset( dsinout.data[j].offset, dsinout.dtype ) isolation_params[prefix + "_base"] = dict( count=dsinout.data[j].size, dtype=dsinout.dtype, fill=10 * i + j ) arg_index += 1 + 2 * (1 - hardcode_arrays) assert i == nfields - 1 else: for i, dsin in enumerate(scalars_in): mi = f"S{i}_in_mesh_info" mesh_info_vars[mi] = self.mesh_info(mi, dsin.mesh) for j in range(dsin.nb_components): prefix = f"S{i}_{j}_in" kernel_args[prefix + "_base"] = dsin.data[j].base_data target_args[prefix + "_strides"] = make_strides( dsin.data[j].strides, dsin.dtype ) target_args[prefix + "_offset"] = make_offset( dsin.data[j].offset, dsin.dtype ) isolation_params[prefix + "_base"] = dict( count=dsin.data[j].size, dtype=dsin.dtype, fill=10 * i + j ) arg_index += 1 + 2 * (1 - hardcode_arrays) assert i == nfields - 1 for i, dsout in enumerate(scalars_out): mi = f"S{i}_out_mesh_info" mesh_info_vars[mi] = self.mesh_info(mi, dsout.mesh) for j in range(dsout.nb_components): prefix = f"S{i}_{j}_out" kernel_args[prefix + "_base"] = dsout.data[j].base_data target_args[prefix + "_strides"] = make_strides( dsout.data[j].strides, dsout.dtype ) target_args[prefix + "_offset"] = make_offset( dsout.data[j].offset, dsout.dtype ) isolation_params[prefix + "_base"] = dict( count=dsout.data[j].size, dtype=dsout.dtype, fill=0 ) arg_index += 1 + 2 * (1 - hardcode_arrays) assert i == nfields - 1 assert len(kernel_args) == arg_index assert len(known_args) == (2 * hardcode_arrays) * ( 1 + (2 - is_inplace) * nscalars ) assert arg_index == (1 + 2 * (1 - hardcode_arrays)) * ( 1 + (2 - is_inplace) * nscalars ) return super().autotune( name=name, position=position, scalars_in=scalars_in, scalars_out=scalars_out, is_inplace=is_inplace, min_s_ghosts=min_s_ghosts, precision=precision, nscalars=nscalars, group_scalars=group_scalars, remesh_kernel=remesh_kernel, remesh_criteria_eps=remesh_criteria_eps, force_atomics=force_atomics, min_nparticles=min_nparticles, ftype=ftype, scalar_cfl=scalar_cfl, kernel_args=kernel_args, mesh_info_vars=mesh_info_vars, work_dim=work_dim, work_size=work_size, known_args=known_args, hardcode_arrays=hardcode_arrays, offset_dtype=offset_dtype, strides_dtype=strides_dtype, isolation_params=isolation_params, **kwds, )
[docs] def compute_args_mapping(self, extra_kwds, extra_parameters): """ Return arguments mapping which is a dictionnary with arguments names as keys and tuples a values. Tuples should contain (arg_position, arg_type(s)) with arg_position being an int and arg_type(s) a type or tuple of types which will be checked against. """ is_inplace = extra_kwds["is_inplace"] scalars_in = extra_kwds["scalars_in"] scalars_out = extra_kwds["scalars_out"] nscalars = extra_kwds["nscalars"] strides_dtype = extra_kwds["strides_dtype"] offset_dtype = extra_kwds["offset_dtype"] hardcode_arrays = extra_kwds["hardcode_arrays"] args_mapping = {} arg_index = 0 args_mapping["position_base"] = (0, cl.MemoryObjectHolder) arg_index += 1 if not hardcode_arrays: args_mapping["position_strides"] = (1, strides_dtype) args_mapping["position_offset"] = (2, offset_dtype) arg_index += 2 if is_inplace: for i, dsinout in enumerate(scalars_in): for j in range(dsinout.nb_components): prefix = f"S{i}_{j}_inout" args_mapping[prefix + "_base"] = (arg_index, cl.MemoryObjectHolder) arg_index += 1 if not hardcode_arrays: args_mapping[prefix + "_strides"] = ( arg_index + 0, strides_dtype, ) args_mapping[prefix + "_offset"] = (arg_index + 1, offset_dtype) arg_index += 2 else: for i, dsin in enumerate(scalars_in): for j in range(dsin.nb_components): prefix = f"S{i}_{j}_in" args_mapping[prefix + "_base"] = (arg_index, cl.MemoryObjectHolder) arg_index += 1 if not hardcode_arrays: args_mapping[prefix + "_strides"] = ( arg_index + 0, strides_dtype, ) args_mapping[prefix + "_offset"] = (arg_index + 1, offset_dtype) arg_index += 2 for i, dsout in enumerate(scalars_out): for j in range(dsout.nb_components): prefix = f"S{i}_{j}_out" args_mapping[prefix + "_base"] = (arg_index, cl.MemoryObjectHolder) arg_index += 1 if not hardcode_arrays: args_mapping[prefix + "_strides"] = ( arg_index + 0, strides_dtype, ) args_mapping[prefix + "_offset"] = (arg_index + 1, offset_dtype) arg_index += 2 assert len(args_mapping) == arg_index assert arg_index == (1 + 2 * (1 - hardcode_arrays)) * ( 1 + (2 - is_inplace) * nscalars ) return args_mapping
[docs] def compute_parameters(self, extra_kwds): """Register extra parameters to optimize.""" check_instance(extra_kwds, dict, keys=str) ftype = extra_kwds["ftype"] work_dim = extra_kwds["work_dim"] precision = extra_kwds["precision"] force_atomics = extra_kwds["force_atomics"] nparticles_options = [1, 2, 4, 8, 16] use_atomics_options = [True] if force_atomics else [True, False] if True in use_atomics_options: cl_env = self.cl_env msg = None if ftype == "float": if not cl_env.has_extension("cl_khr_local_int32_base_atomics"): msg = "OpenCL device {} does not support int32 atomics " msg += "(cl_khr_local_int32_base_atomics)." msg = msg.format(cl_env.device.name) elif ftype == "double": if not cl_env.has_extension("cl_khr_int64_base_atomics"): msg = "OpenCL device {} does not support int64 atomics " msg += "(cl_khr_int64_base_atomics)." msg = msg.format(cl_env.device.name) else: msg = "Atomic remeshing kernel has not been implemented for " msg += f"{precision} yet." if msg: if force_atomics: msg += "\nParameter force_atomics was set to True, aborting..." raise RuntimeError(msg) else: msg = f"\n{msg}" msg += "\nAtomic version of the remeshing kernel will be disabled." warnings.warn(msg, CodeGeneratorWarning) use_atomics_options.remove(True) autotuner_flag = self.autotuner_config.autotuner_flag if autotuner_flag == AutotunerFlags.ESTIMATE: if True in use_atomics_options: use_atomics_options.pop(False) max_workitem_workload = [1, 1, 1] elif autotuner_flag == AutotunerFlags.MEASURE: max_workitem_workload = [1, 8, 1] elif autotuner_flag == AutotunerFlags.PATIENT: max_workitem_workload = [1, 8, 8] elif autotuner_flag == AutotunerFlags.EXHAUSTIVE: max_workitem_workload = [1, 16, 16] max_workitem_workload = npw.asarray(max_workitem_workload[:work_dim]) extra_kwds["max_work_load"] = max_workitem_workload params = super().compute_parameters(extra_kwds=extra_kwds) params.register_extra_parameter("use_atomics", use_atomics_options) params.register_extra_parameter("nparticles", nparticles_options) return params
[docs] def compute_min_max_wg_size( self, work_bounds, work_load, global_work_size, extra_parameters, extra_kwds ): """Default min and max workgroup size.""" cache_ghosts = extra_kwds["min_s_ghosts"] min_wg_size = npw.ones(shape=work_bounds.work_dim, dtype=npw.int32) min_wg_size[0] = max(min_wg_size[0], 2 * cache_ghosts) max_wg_size = npw.ones(shape=work_bounds.work_dim, dtype=npw.int32) max_wg_size[0] = max(global_work_size[0], min_wg_size[0]) return (min_wg_size, max_wg_size)
[docs] def compute_global_work_size( self, local_work_size, work, extra_parameters, extra_kwds ): gs = super().compute_global_work_size( local_work_size=local_work_size, work=work, extra_parameters=extra_parameters, extra_kwds=extra_kwds, ) gs[0] = local_work_size[0] return gs
[docs] def generate_kernel_src( self, global_work_size, local_work_size, extra_parameters, extra_kwds, tuning_mode, dry_run, ): """Generate kernel name and source code""" # Extract usefull variables ftype = extra_kwds["ftype"] work_dim = extra_kwds["work_dim"] nscalars = extra_kwds["nscalars"] known_args = extra_kwds["known_args"] is_inplace = extra_kwds["is_inplace"] scalar_cfl = extra_kwds["scalar_cfl"] remesh_kernel = extra_kwds["remesh_kernel"] group_scalars = extra_kwds["group_scalars"] mesh_info_vars = extra_kwds["mesh_info_vars"] min_nparticles = extra_kwds["min_nparticles"] remesh_criteria_eps = extra_kwds["remesh_criteria_eps"] # Extract and check parameters nparticles = extra_parameters["nparticles"] use_atomics = extra_parameters["use_atomics"] if (not use_atomics) and (nparticles < min_nparticles): msg = "Insufficient number of particles, min={}, got {}." msg = msg.format(min_nparticles, nparticles) raise KernelGenerationError(msg) # Get compile time OpenCL known variables known_vars = super().generate_kernel_src( global_work_size=global_work_size, local_work_size=local_work_size, extra_parameters=extra_parameters, extra_kwds=extra_kwds, tuning_mode=tuning_mode, dry_run=dry_run, ) known_vars.update(mesh_info_vars) known_vars.update(known_args) # disable periodic-periodic because we exchange ghosts anyways sboundaries = ( BoundaryCondition.NONE, BoundaryCondition.NONE, ) # Generate OpenCL source code codegen = DirectionalRemeshKernelGenerator( typegen=self.typegen, work_dim=work_dim, ftype=ftype, nparticles=nparticles, nscalars=nscalars, sboundary=sboundaries, is_inplace=is_inplace, scalar_cfl=scalar_cfl, remesh_kernel=remesh_kernel, group_scalars=group_scalars, remesh_criteria_eps=remesh_criteria_eps, use_atomics=use_atomics, symbolic_mode=self.symbolic_mode, tuning_mode=tuning_mode, debug_mode=False, known_vars=known_vars, ) kernel_name = codegen.name kernel_src = str(codegen) # Check if cache would fit if local_work_size is not None: self.check_cache(codegen.required_workgroup_cache_size(local_work_size)[2]) return (kernel_name, kernel_src)
[docs] def hash_extra_kwds(self, extra_kwds): """Hash extra_kwds dictionnary for caching purposes.""" kwds = ( "remesh_criteria_eps", "nscalars", "ftype", "is_inplace", "remesh_kernel", "work_size", "known_args", ) return self.custom_hash( *tuple(extra_kwds[kwd] for kwd in kwds), mesh_info_vars=extra_kwds["mesh_info_vars"], )